Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Support for Handling Missing Data in Anomaly Detection #1274

Merged
merged 3 commits into from
Aug 17, 2024

Conversation

kaituo
Copy link
Collaborator

@kaituo kaituo commented Aug 8, 2024

Description

This PR introduces enhanced handling of missing data, giving customers the flexibility to choose how to address gaps in their data. Options include ignoring missing data (default behavior), filling with fixed values (customer-specified), zeros, or previous values. These options can improve recall in anomaly detection scenarios. For example, in this forum discussion https://forum.opensearch.org/t/do-missing-buckets-ruin-anomaly-detection/16535, customers can now opt to fill missing values with zeros to maintain detection accuracy.

Key Changes:

  1. Enhanced Missing Data Handling:

Changed to ThresholdedRandomCutForest.process(double[] inputPoint, long timestamp, int[] missingValues) to support missing data in both real-time and historical analyses. The preview mode remains unchanged for efficiency, utilizing existing linear imputation techniques. (See classes: ADColdStart, ModelColdStart, ModelManager, ADBatchTaskRunner).

  1. Refactoring Imputation & Processing:

Refactored the imputation process, failure handling, statistics collection, and result saving in Inferencer.

  1. Improved Imputed Value Reconstruction:

Reconstructed imputed values using existing mean and standard deviation, ensuring they are accurately stored in AnomalyResult. Added a featureImputed boolean tag to flag imputed values. (See class: AnomalyResult).

  1. Broadcast Support for HC Detectors:

Added a broadcast mechanism for HC detectors to identify entity models that haven’t received data in a given interval. This ensures models in memory process all relevant data before imputation begins. Single stream detectors handle this within existing transport messages. (See classes: ADHCImputeTransportAction, ADResultProcessor, ResultProcessor).

  1. Introduction of ActionListenerExecutor:

Added ActionListenerExecutor to wrap response and failure handlers in an ActionListener, executing them asynchronously using the provided ExecutorService. This allows us to handle responses in the AD thread pool.

  1. Also, this PR followed Fix linux build CI error due to action runner env upgrade node 20 k-NN#1795 to fix CI.

Testing:
Comprehensive testing was conducted, including both integration and unit tests. Of the 7177 lines added and 1685 lines removed, 4926 additions and 749 deletions are in tests, ensuring robust coverage.

Check List

  • New functionality includes testing.
  • New functionality has been documented.
  • API changes companion pull request created.
  • Commits are signed per the DCO using --signoff.
  • Public documentation issue/PR created.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Copy link

codecov bot commented Aug 9, 2024

Codecov Report

Attention: Patch coverage is 76.16192% with 159 lines in your changes missing coverage. Please review.

Project coverage is 77.60%. Comparing base (3f0fc8c) to head (7814c6d).
Report is 7 commits behind head on main.

Files Patch % Lines
...in/java/org/opensearch/ad/model/AnomalyResult.java 47.22% 14 Missing and 5 partials ⚠️
...n/java/org/opensearch/timeseries/model/Config.java 22.72% 13 Missing and 4 partials ⚠️
...ensearch/timeseries/transport/ResultProcessor.java 81.33% 9 Missing and 5 partials ⚠️
...org/opensearch/ad/transport/ADHCImputeRequest.java 45.45% 12 Missing ⚠️
...g/opensearch/timeseries/ml/RealTimeInferencer.java 83.56% 8 Missing and 4 partials ⚠️
...pensearch/ad/transport/ADHCImputeNodeResponse.java 26.66% 11 Missing ⚠️
...search/ad/transport/ADHCImputeTransportAction.java 79.41% 4 Missing and 3 partials ⚠️
...opensearch/ad/transport/ADHCImputeNodeRequest.java 40.00% 6 Missing ⚠️
.../java/org/opensearch/ad/ml/ThresholdingResult.java 66.66% 2 Missing and 3 partials ⚠️
...ensearch/ad/transport/ADHCImputeNodesResponse.java 28.57% 5 Missing ⚠️
... and 21 more
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##               main    #1274      +/-   ##
============================================
+ Coverage     71.83%   77.60%   +5.77%     
- Complexity     4898     5436     +538     
============================================
  Files           518      532      +14     
  Lines         22879    23251     +372     
  Branches       2245     2301      +56     
============================================
+ Hits          16434    18043    +1609     
+ Misses         5410     4165    -1245     
- Partials       1035     1043       +8     
Flag Coverage Δ
plugin 77.60% <76.16%> (+5.77%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Δ
.../java/org/opensearch/ad/AnomalyDetectorRunner.java 80.95% <100.00%> (+37.92%) ⬆️
...ava/org/opensearch/ad/ml/ADRealTimeInferencer.java 100.00% <100.00%> (ø)
.../org/opensearch/ad/model/ImputedFeatureResult.java 100.00% <100.00%> (ø)
...pensearch/ad/ratelimit/ADCheckpointReadWorker.java 100.00% <ø> (ø)
...rg/opensearch/ad/ratelimit/ADColdEntityWorker.java 100.00% <ø> (ø)
...org/opensearch/ad/ratelimit/ADColdStartWorker.java 100.00% <100.00%> (+64.28%) ⬆️
.../opensearch/ad/ratelimit/ADSaveResultStrategy.java 96.55% <ø> (+27.58%) ⬆️
.../handler/AbstractAnomalyDetectorActionHandler.java 97.82% <ø> (+77.82%) ⬆️
...tings/LegacyOpenDistroAnomalyDetectorSettings.java 100.00% <100.00%> (ø)
.../java/org/opensearch/ad/task/ADBatchTaskCache.java 96.29% <100.00%> (+0.06%) ⬆️
... and 71 more

... and 80 files with indirect coverage changes

@kaituo kaituo added feature new feature and removed infra Changes to infrastructure, testing, CI/CD, pipelines, etc. labels Aug 9, 2024
@kaituo
Copy link
Collaborator Author

kaituo commented Aug 9, 2024

testBackwardsCompatibility failed similiar to opensearch-project/k-NN#1622

Execution failed for task ':adBwcCluster#twoThirdsUpgradedClusterTask'. It is a core issue. Created an issue there: opensearch-project/OpenSearch#15234

org.opensearch.client.ResponseException: method [POST], host [http://[::1]:35211], URI [/_opendistro/_anomaly_detection/detectors/d9WnOJEBMKxU7iDLZox_/_start], status line [HTTP/1.1 500 Internal Server Error]
{"error":{"root_cause":[{"type":"status_exception","reason":"Fail to start detector"}],"type":"status_exception","reason":"Fail to start detector"},"status":500}
	at __randomizedtesting.SeedInfo.seed([4FABFE2200DFC164:A494C3EC7A0414EE]:0)
	at app//org.opensearch.client.RestClient.convertResponse(RestClient.java:453)
	at app//org.opensearch.client.RestClient.performRequest(RestClient.java:383)
	at app//org.opensearch.client.RestClient.performRequest(RestClient.java:358)
	at app//org.opensearch.timeseries.TestHelpers.makeRequest(TestHelpers.java:230)
	at app//org.opensearch.timeseries.TestHelpers.makeRequest(TestHelpers.java:203)
	at app//org.opensearch.ad.bwc.ADBackwardsCompatibilityIT.startAnomalyDetector(ADBackwardsCompatibilityIT.java:442)
	at app//org.opensearch.ad.bwc.ADBackwardsCompatibilityIT.createRealtimeAnomalyDetectorsAndStart(ADBackwardsCompatibilityIT.java:373)
	at app//org.opensearch.ad.bwc.ADBackwardsCompatibilityIT.testBackwardsCompatibility(ADBackwardsCompatibilityIT.java:174)
	at [email protected]/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at [email protected]/java.lang.reflect.Method.invoke(Method.java:580)
	at app//com.carrotsearch.randomizedtesting.RandomizedRunner.invoke(RandomizedRunner.java:1750)
	at app//com.carrotsearch.randomizedtesting.RandomizedRunner$8.evaluate(RandomizedRunner.java:938)
	at app//com.carrotsearch.randomizedtesting.RandomizedRunner$9.evaluate(RandomizedRunner.java:974)
	at app//com.carrotsearch.randomizedtesting.RandomizedRunner$10.evaluate(RandomizedRunner.java:988)
	at app//com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
	at app//org.junit.rules.RunRules.evaluate(RunRules.java:20)
	at app//org.apache.lucene.tests.util.TestRuleSetupTeardownChained$1.evaluate(TestRuleSetupTeardownChained.java:48)
	at app//org.apache.lucene.tests.util.AbstractBeforeAfterRule$1.evaluate(AbstractBeforeAfterRule.java:43)
	at app//org.apache.lucene.tests.util.TestRuleThreadAndTestName$1.evaluate(TestRuleThreadAndTestName.java:45)
	at app//org.apache.lucene.tests.util.TestRuleIgnoreAfterMaxFailures$1.evaluate(TestRuleIgnoreAfterMaxFailures.java:60)
	at app//org.apache.lucene.tests.util.TestRuleMarkFailure$1.evaluate(TestRuleMarkFailure.java:44)
	at app//org.junit.rules.RunRules.evaluate(RunRules.java:20)
	at app//com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
	at app//com.carrotsearch.randomizedtesting.ThreadLeakControl$StatementRunner.run(ThreadLeakControl.java:368)
	at app//com.carrotsearch.randomizedtesting.ThreadLeakControl.forkTimeoutingTask(ThreadLeakControl.java:817)
	at app//com.carrotsearch.randomizedtesting.ThreadLeakControl$3.evaluate(ThreadLeakControl.java:468)
	at app//com.carrotsearch.randomizedtesting.RandomizedRunner.runSingleTest(RandomizedRunner.java:947)
	at app//com.carrotsearch.randomizedtesting.RandomizedRunner$5.evaluate(RandomizedRunner.java:832)
	at app//com.carrotsearch.randomizedtesting.RandomizedRunner$6.evaluate(RandomizedRunner.java:883)
	at app//com.carrotsearch.randomizedtesting.RandomizedRunner$7.evaluate(RandomizedRunner.java:894)
	at app//org.apache.lucene.tests.util.AbstractBeforeAfterRule$1.evaluate(AbstractBeforeAfterRule.java:43)
	at app//com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
	at app//org.apache.lucene.tests.util.TestRuleStoreClassName$1.evaluate(TestRuleStoreClassName.java:38)
	at app//com.carrotsearch.randomizedtesting.rules.NoShadowingOrOverridesOnMethodsRule$1.evaluate(NoShadowingOrOverridesOnMethodsRule.java:40)
	at app//com.carrotsearch.randomizedtesting.rules.NoShadowingOrOverridesOnMethodsRule$1.evaluate(NoShadowingOrOverridesOnMethodsRule.java:40)
	at app//com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
	at app//com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
	at app//org.apache.lucene.tests.util.TestRuleAssertionsRequired$1.evaluate(TestRuleAssertionsRequired.java:53)
	at app//org.apache.lucene.tests.util.AbstractBeforeAfterRule$1.evaluate(AbstractBeforeAfterRule.java:43)
	at app//org.apache.lucene.tests.util.TestRuleMarkFailure$1.evaluate(TestRuleMarkFailure.java:44)
	at app//org.apache.lucene.tests.util.TestRuleIgnoreAfterMaxFailures$1.evaluate(TestRuleIgnoreAfterMaxFailures.java:60)
	at app//org.apache.lucene.tests.util.TestRuleIgnoreTestSuites$1.evaluate(TestRuleIgnoreTestSuites.java:47)
	at app//org.junit.rules.RunRules.evaluate(RunRules.java:20)
	at app//com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
	at app//com.carrotsearch.randomizedtesting.ThreadLeakControl$StatementRunner.run(ThreadLeakControl.java:368)
	at [email protected]/java.lang.Thread.run(Thread.java:1583)

[2024-08-09T12:39:40,606][ERROR][o.o.t.t.TaskManager      ] [adBwcCluster0-2] Failed to search task for config d9WnOJEBMKxU7iDLZox_
org.opensearch.action.search.SearchPhaseExecutionException: all shards failed
	at org.opensearch.action.search.AbstractSearchAsyncAction.onPhaseFailure(AbstractSearchAsyncAction.java:770) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.action.search.AbstractSearchAsyncAction.executeNextPhase(AbstractSearchAsyncAction.java:395) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.action.search.AbstractSearchAsyncAction.onPhaseDone(AbstractSearchAsyncAction.java:810) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.action.search.AbstractSearchAsyncAction.onShardFailure(AbstractSearchAsyncAction.java:548) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.action.search.AbstractSearchAsyncAction$1.onFailure(AbstractSearchAsyncAction.java:316) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.action.search.SearchExecutionStatsCollector.onFailure(SearchExecutionStatsCollector.java:104) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.action.ActionListenerResponseHandler.handleException(ActionListenerResponseHandler.java:75) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.action.search.SearchTransportService$ConnectionCountingHandler.handleException(SearchTransportService.java:766) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.TransportService$9.handleException(TransportService.java:1729) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.TransportService$ContextRestoreResponseHandler.handleException(TransportService.java:1515) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.NativeMessageHandler.lambda$handleException$5(NativeMessageHandler.java:454) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.common.util.concurrent.OpenSearchExecutors$DirectExecutorService.execute(OpenSearchExecutors.java:343) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.NativeMessageHandler.handleException(NativeMessageHandler.java:452) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.NativeMessageHandler.handlerResponseError(NativeMessageHandler.java:444) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.NativeMessageHandler.handleMessage(NativeMessageHandler.java:172) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.NativeMessageHandler.messageReceived(NativeMessageHandler.java:126) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.InboundHandler.messageReceivedFromPipeline(InboundHandler.java:121) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.InboundHandler.inboundMessage(InboundHandler.java:113) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.TcpTransport.inboundMessage(TcpTransport.java:800) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.nativeprotocol.NativeInboundBytesHandler.forwardFragments(NativeInboundBytesHandler.java:157) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.nativeprotocol.NativeInboundBytesHandler.doHandleBytes(NativeInboundBytesHandler.java:94) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.InboundPipeline.doHandleBytes(InboundPipeline.java:143) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.InboundPipeline.handleBytes(InboundPipeline.java:119) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.netty4.Netty4MessageChannelHandler.channelRead(Netty4MessageChannelHandler.java:95) [transport-netty4-client-2.17.0.jar:2.17.0]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:280) [netty-handler-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:689) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:652) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994) [netty-common-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.112.Final.jar:4.1.112.Final]
	at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
Caused by: org.opensearch.OpenSearchException$3: unexpected byte [0xb9]
	at org.opensearch.OpenSearchException.guessRootCauses(OpenSearchException.java:710) ~[opensearch-core-2.17.0.jar:2.17.0]
	at org.opensearch.action.search.AbstractSearchAsyncAction.executeNextPhase(AbstractSearchAsyncAction.java:393) [opensearch-2.17.0.jar:2.17.0]
	... 41 more
Caused by: java.lang.IllegalStateException: unexpected byte [0xb9]
	at org.opensearch.core.common.io.stream.StreamInput.readBoolean(StreamInput.java:593) ~[opensearch-core-2.17.0.jar:2.17.0]
	at org.opensearch.core.common.io.stream.StreamInput.readBoolean(StreamInput.java:583) ~[opensearch-core-2.17.0.jar:2.17.0]
	at org.opensearch.core.common.io.stream.StreamInput.readOptionalString(StreamInput.java:373) ~[opensearch-core-2.17.0.jar:2.17.0]
	at org.opensearch.search.internal.ShardSearchRequest.<init>(ShardSearchRequest.java:260) ~[opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.RequestHandlerRegistry.newRequest(RequestHandlerRegistry.java:85) ~[opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.InboundHandler.newRequest(InboundHandler.java:309) ~[opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.InboundHandler.handleRequest(InboundHandler.java:264) ~[opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.InboundHandler.messageReceived(InboundHandler.java:144) ~[opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.InboundHandler.inboundMessage(InboundHandler.java:127) ~[opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.TcpTransport.inboundMessage(TcpTransport.java:770) ~[opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.InboundPipeline.forwardFragments(InboundPipeline.java:175) ~[opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.InboundPipeline.doHandleBytes(InboundPipeline.java:150) ~[opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.InboundPipeline.handleBytes(InboundPipeline.java:115) ~[opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.netty4.Netty4MessageChannelHandler.channelRead(Netty4MessageChannelHandler.java:95) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[?:?]
	at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:280) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[?:?]
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[?:?]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[?:?]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[?:?]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[?:?]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[?:?]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:689) ~[?:?]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:652) ~[?:?]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[?:?]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[?:?]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[?:?]
	at java.lang.Thread.run(Thread.java:1583) ~[?:?]
[2024-08-09T12:39:40,613][ERROR][o.o.t.u.RestHandlerUtils ] [adBwcCluster0-2] Wrap exception before sending back to user
org.opensearch.action.search.SearchPhaseExecutionException: all shards failed
	at org.opensearch.action.search.AbstractSearchAsyncAction.onPhaseFailure(AbstractSearchAsyncAction.java:770) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.action.search.AbstractSearchAsyncAction.executeNextPhase(AbstractSearchAsyncAction.java:395) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.action.search.AbstractSearchAsyncAction.onPhaseDone(AbstractSearchAsyncAction.java:810) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.action.search.AbstractSearchAsyncAction.onShardFailure(AbstractSearchAsyncAction.java:548) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.action.search.AbstractSearchAsyncAction$1.onFailure(AbstractSearchAsyncAction.java:316) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.action.search.SearchExecutionStatsCollector.onFailure(SearchExecutionStatsCollector.java:104) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.action.ActionListenerResponseHandler.handleException(ActionListenerResponseHandler.java:75) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.action.search.SearchTransportService$ConnectionCountingHandler.handleException(SearchTransportService.java:766) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.TransportService$9.handleException(TransportService.java:1729) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.TransportService$ContextRestoreResponseHandler.handleException(TransportService.java:1515) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.NativeMessageHandler.lambda$handleException$5(NativeMessageHandler.java:454) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.common.util.concurrent.OpenSearchExecutors$DirectExecutorService.execute(OpenSearchExecutors.java:343) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.NativeMessageHandler.handleException(NativeMessageHandler.java:452) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.NativeMessageHandler.handlerResponseError(NativeMessageHandler.java:444) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.NativeMessageHandler.handleMessage(NativeMessageHandler.java:172) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.NativeMessageHandler.messageReceived(NativeMessageHandler.java:126) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.InboundHandler.messageReceivedFromPipeline(InboundHandler.java:121) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.InboundHandler.inboundMessage(InboundHandler.java:113) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.TcpTransport.inboundMessage(TcpTransport.java:800) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.nativeprotocol.NativeInboundBytesHandler.forwardFragments(NativeInboundBytesHandler.java:157) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.nativeprotocol.NativeInboundBytesHandler.doHandleBytes(NativeInboundBytesHandler.java:94) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.InboundPipeline.doHandleBytes(InboundPipeline.java:143) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.InboundPipeline.handleBytes(InboundPipeline.java:119) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.netty4.Netty4MessageChannelHandler.channelRead(Netty4MessageChannelHandler.java:95) [transport-netty4-client-2.17.0.jar:2.17.0]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:280) [netty-handler-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:689) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:652) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994) [netty-common-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.112.Final.jar:4.1.112.Final]
	at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
Caused by: org.opensearch.OpenSearchException$3: unexpected byte [0xb9]
	at org.opensearch.OpenSearchException.guessRootCauses(OpenSearchException.java:710) ~[opensearch-core-2.17.0.jar:2.17.0]
	at org.opensearch.action.search.AbstractSearchAsyncAction.executeNextPhase(AbstractSearchAsyncAction.java:393) [opensearch-2.17.0.jar:2.17.0]
	... 41 more
Caused by: java.lang.IllegalStateException: unexpected byte [0xb9]
	at org.opensearch.core.common.io.stream.StreamInput.readBoolean(StreamInput.java:593) ~[opensearch-core-2.17.0.jar:2.17.0]
	at org.opensearch.core.common.io.stream.StreamInput.readBoolean(StreamInput.java:583) ~[opensearch-core-2.17.0.jar:2.17.0]
	at org.opensearch.core.common.io.stream.StreamInput.readOptionalString(StreamInput.java:373) ~[opensearch-core-2.17.0.jar:2.17.0]
	at org.opensearch.search.internal.ShardSearchRequest.<init>(ShardSearchRequest.java:260) ~[opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.RequestHandlerRegistry.newRequest(RequestHandlerRegistry.java:85) ~[opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.InboundHandler.newRequest(InboundHandler.java:309) ~[opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.InboundHandler.handleRequest(InboundHandler.java:264) ~[opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.InboundHandler.messageReceived(InboundHandler.java:144) ~[opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.InboundHandler.inboundMessage(InboundHandler.java:127) ~[opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.TcpTransport.inboundMessage(TcpTransport.java:770) ~[opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.InboundPipeline.forwardFragments(InboundPipeline.java:175) ~[opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.InboundPipeline.doHandleBytes(InboundPipeline.java:150) ~[opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.InboundPipeline.handleBytes(InboundPipeline.java:115) ~[opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.netty4.Netty4MessageChannelHandler.channelRead(Netty4MessageChannelHandler.java:95) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[?:?]
	at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:280) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[?:?]
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[?:?]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[?:?]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[?:?]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[?:?]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[?:?]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:689) ~[?:?]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:652) ~[?:?]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[?:?]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[?:?]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[?:?]
	at java.lang.Thread.run(Thread.java:1583) ~[?:?]
[2024-08-09T12:39:40,621][WARN ][r.suppressed             ] [adBwcCluster0-2] path: /_opendistro/_anomaly_detection/detectors/d9WnOJEBMKxU7iDLZox_/_start, params: {detectorID=d9WnOJEBMKxU7iDLZox_}
org.opensearch.OpenSearchStatusException: Fail to start detector
	at org.opensearch.timeseries.util.RestHandlerUtils.lambda$wrapRestActionListener$2(RestHandlerUtils.java:243) [opensearch-anomaly-detection-2.17.0.0.jar:2.17.0.0]
	at org.opensearch.core.action.ActionListener$1.onFailure(ActionListener.java:90) [opensearch-core-2.17.0.jar:2.17.0]
	at org.opensearch.core.action.ActionListener$1.onFailure(ActionListener.java:90) [opensearch-core-2.17.0.jar:2.17.0]
	at org.opensearch.timeseries.rest.handler.IndexJobActionHandler.lambda$onGetJobForWrite$10(IndexJobActionHandler.java:306) [opensearch-anomaly-detection-2.17.0.0.jar:2.17.0.0]
	at org.opensearch.core.action.ActionListener$1.onFailure(ActionListener.java:90) [opensearch-core-2.17.0.jar:2.17.0]
	at org.opensearch.timeseries.task.TaskManager.lambda$getAndExecuteOnLatestTasks$17(TaskManager.java:598) [opensearch-anomaly-detection-2.17.0.0.jar:2.17.0.0]
	at org.opensearch.core.action.ActionListener$1.onFailure(ActionListener.java:90) [opensearch-core-2.17.0.jar:2.17.0]
	at org.opensearch.action.support.TransportAction$1.onFailure(TransportAction.java:124) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.core.action.ActionListener$5.onFailure(ActionListener.java:277) [opensearch-core-2.17.0.jar:2.17.0]
	at org.opensearch.action.search.AbstractSearchAsyncAction.raisePhaseFailure(AbstractSearchAsyncAction.java:797) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.action.search.AbstractSearchAsyncAction.onPhaseFailure(AbstractSearchAsyncAction.java:770) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.action.search.AbstractSearchAsyncAction.executeNextPhase(AbstractSearchAsyncAction.java:395) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.action.search.AbstractSearchAsyncAction.onPhaseDone(AbstractSearchAsyncAction.java:810) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.action.search.AbstractSearchAsyncAction.onShardFailure(AbstractSearchAsyncAction.java:548) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.action.search.AbstractSearchAsyncAction$1.onFailure(AbstractSearchAsyncAction.java:316) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.action.search.SearchExecutionStatsCollector.onFailure(SearchExecutionStatsCollector.java:104) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.action.ActionListenerResponseHandler.handleException(ActionListenerResponseHandler.java:75) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.action.search.SearchTransportService$ConnectionCountingHandler.handleException(SearchTransportService.java:766) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.TransportService$9.handleException(TransportService.java:1729) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.TransportService$ContextRestoreResponseHandler.handleException(TransportService.java:1515) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.NativeMessageHandler.lambda$handleException$5(NativeMessageHandler.java:454) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.common.util.concurrent.OpenSearchExecutors$DirectExecutorService.execute(OpenSearchExecutors.java:343) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.NativeMessageHandler.handleException(NativeMessageHandler.java:452) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.NativeMessageHandler.handlerResponseError(NativeMessageHandler.java:444) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.NativeMessageHandler.handleMessage(NativeMessageHandler.java:172) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.NativeMessageHandler.messageReceived(NativeMessageHandler.java:126) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.InboundHandler.messageReceivedFromPipeline(InboundHandler.java:121) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.InboundHandler.inboundMessage(InboundHandler.java:113) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.TcpTransport.inboundMessage(TcpTransport.java:800) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.nativeprotocol.NativeInboundBytesHandler.forwardFragments(NativeInboundBytesHandler.java:157) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.nativeprotocol.NativeInboundBytesHandler.doHandleBytes(NativeInboundBytesHandler.java:94) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.InboundPipeline.doHandleBytes(InboundPipeline.java:143) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.InboundPipeline.handleBytes(InboundPipeline.java:119) [opensearch-2.17.0.jar:2.17.0]
	at org.opensearch.transport.netty4.Netty4MessageChannelHandler.channelRead(Netty4MessageChannelHandler.java:95) [transport-netty4-client-2.17.0.jar:2.17.0]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:280) [netty-handler-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:689) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:652) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994) [netty-common-4.1.112.Final.jar:4.1.112.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.112.Final.jar:4.1.112.Final]
	at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
[2024-08-09T12:39:41,601][INFO ][o.o.i.r.RecoverySourceHandler] [adBwcCluster0-2] [test_data_for_ad_plugin][0][recover to adBwcCluster0-1] finalizing recovery took [7.1ms]
(base)
(24-08-09 13:54:36) <0> [~]
dev-dsk-kaituo-2b-bf84c4db %

Comment on lines +226 to +227
for (int i = 0; i < pointSamples.size(); i++) {
Sample dataSample = pointSamples.get(i);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can be replaced with for-each loop

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's hard to say which one is better. https://programmerr47.medium.com/to-index-or-iterate-7b81039e5484 shows indexed loop is faster than for-each loop.

This PR introduces enhanced handling of missing data, giving customers the flexibility to choose how to address gaps in their data. Options include ignoring missing data (default behavior), filling with fixed values (customer-specified), zeros, or previous values. These options can improve recall in anomaly detection scenarios. For example, in this forum discussion https://forum.opensearch.org/t/do-missing-buckets-ruin-anomaly-detection/16535, customers can now opt to fill missing values with zeros to maintain detection accuracy.

Key Changes:
1. Enhanced Missing Data Handling:

Changed to ThresholdedRandomCutForest.process(double[] inputPoint, long timestamp, int[] missingValues) to support missing data in both real-time and historical analyses. The preview mode remains unchanged for efficiency, utilizing existing linear imputation techniques. (See classes: ADColdStart, ModelColdStart, ModelManager, ADBatchTaskRunner).

2. Refactoring Imputation & Processing:

Refactored the imputation process, failure handling, statistics collection, and result saving in Inferencer.

3. Improved Imputed Value Reconstruction:

Reconstructed imputed values using existing mean and standard deviation, ensuring they are accurately stored in AnomalyResult. Added a featureImputed boolean tag to flag imputed values. (See class: AnomalyResult).

4. Broadcast Support for HC Detectors:

Added a broadcast mechanism for HC detectors to identify entity models that haven’t received data in a given interval. This ensures models in memory process all relevant data before imputation begins. Single stream detectors handle this within existing transport messages. (See classes: ADHCImputeTransportAction, ADResultProcessor, ResultProcessor).

5. Introduction of ActionListenerExecutor:

Added ActionListenerExecutor to wrap response and failure handlers in an ActionListener, executing them asynchronously using the provided ExecutorService. This allows us to handle responses in the AD thread pool.

Testing:
Comprehensive testing was conducted, including both integration and unit tests. Of the 7135 lines added and 1683 lines removed, 4926 additions and 749 deletions are in tests, ensuring robust coverage.

Signed-off-by: Kaituo Li <[email protected]>
Signed-off-by: Kaituo Li <[email protected]>
@kaituo
Copy link
Collaborator Author

kaituo commented Aug 14, 2024

Whitesource check does not run and it is a known issue to infra team.

@@ -147,23 +145,22 @@ public <RCFDescriptor extends AnomalyDescriptor> IntermediateResultType score(
if (!modelState.getSamples().isEmpty()) {
for (Sample unProcessedSample : modelState.getSamples()) {
// we are sure that the process method will indeed return an instance of RCFDescriptor.
rcfModel.process(unProcessedSample.getValueList(), unProcessedSample.getDataEndTime().getEpochSecond());
double[] unProcessedPoint = unProcessedSample.getValueList();
int[] missingIndices = DataUtil.generateMissingIndicesArray(unProcessedPoint);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these missing indices referring to the features that don't have values?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.

double[] toScore = null;
if (dataPoint.isEmpty()) {
toScore = new double[detector.getEnabledFeatureIds().size()];
Arrays.fill(toScore, Double.NaN);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we filling it here with Double.NaN and not the filledValues, for example the fixed value or previous value, is this done elsewhere.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double.NaN is used to signal we should put the corresponding indices in the missing value array. RCF will fill in fixed value or previous value according to the missing value array.


import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest;

public class ADHCImputeTransportAction extends
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a little more comments in this class, I was a little confused on the broadcasting and why we are sending NaN values. Or how do we know which node to has which entities

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added comment:

/**

  • This class manages the broadcasting mechanism and entity data processing for
  • the HC detector. The system broadcasts a message after processing all records
  • in each interval to ensure that each node examines its hot models in memory
  • and determines which entity models have not received data during the current interval.
  • "Hot" entities refer to those models actively loaded in memory, as opposed to
  • "cold" models, which are not loaded and remain in storage due to limited memory resources.
  • Upon receiving the broadcast message, each node checks whether each hot entity
  • has received new data. If a hot entity has not received any data, the system
  • assigns a NaN value to that entity. This NaN value signals to the model that no
  • data was received, prompting it to impute the missing value based on previous data,
  • rather than using current interval data.
  • The system determines which node manages which entities based on memory availability.
  • The coordinating node does not immediately know which entities are hot or cold;
  • it learns this during the pagination process. Hot entities are those that have
  • recently received data and are actively maintained in memory, while cold entities
  • remain in storage and are processed only if time permits within the interval.
  • For cold entities whose models are not loaded in memory, the system does not
  • produce an anomaly result for that interval due to insufficient time or resources
  • to process them. This is particularly relevant in scenarios with short intervals,
  • such as one minute, where an underscaled cluster may cause processing delays
  • that prevent timely anomaly detection for some entities.
    */

* @param executorService the ExecutorService used to execute the onResponse handler asynchronously
* @return an ActionListener that handles the response and failure cases
*/
public static <Response> ActionListener<Response> wrap(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the only difference with this added that we are making sure only to use AD thread pool?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

@kaituo kaituo merged commit dc85dc4 into opensearch-project:main Aug 17, 2024
18 checks passed
@opensearch-trigger-bot
Copy link

The backport to 2.x failed:

The process '/usr/bin/git' failed with exit code 128

To backport manually, run these commands in your terminal:

# Navigate to the root of your repository
cd $(git rev-parse --show-toplevel)
# Fetch latest updates from GitHub
git fetch
# Create a new working tree
git worktree add ../.worktrees/anomaly-detection/backport-2.x 2.x
# Navigate to the new working tree
pushd ../.worktrees/anomaly-detection/backport-2.x
# Create a new branch
git switch --create backport/backport-1274-to-2.x
# Cherry-pick the merged commit of this pull request and resolve the conflicts
git cherry-pick -x --mainline 1 dc85dc4e97dc1fc14a5d367072c6a40dbec2ee7c
# Push it to GitHub
git push --set-upstream origin backport/backport-1274-to-2.x
# Go back to the original working tree
popd
# Delete the working tree
git worktree remove ../.worktrees/anomaly-detection/backport-2.x

Then, create a pull request where the base branch is 2.x and the compare/head branch is backport/backport-1274-to-2.x.

kaituo added a commit to kaituo/anomaly-detection-1 that referenced this pull request Aug 19, 2024
…h-project#1274)

* Add Support for Handling Missing Data in Anomaly Detection

This PR introduces enhanced handling of missing data, giving customers the flexibility to choose how to address gaps in their data. Options include ignoring missing data (default behavior), filling with fixed values (customer-specified), zeros, or previous values. These options can improve recall in anomaly detection scenarios. For example, in this forum discussion https://forum.opensearch.org/t/do-missing-buckets-ruin-anomaly-detection/16535, customers can now opt to fill missing values with zeros to maintain detection accuracy.

Key Changes:
1. Enhanced Missing Data Handling:

Changed to ThresholdedRandomCutForest.process(double[] inputPoint, long timestamp, int[] missingValues) to support missing data in both real-time and historical analyses. The preview mode remains unchanged for efficiency, utilizing existing linear imputation techniques. (See classes: ADColdStart, ModelColdStart, ModelManager, ADBatchTaskRunner).

2. Refactoring Imputation & Processing:

Refactored the imputation process, failure handling, statistics collection, and result saving in Inferencer.

3. Improved Imputed Value Reconstruction:

Reconstructed imputed values using existing mean and standard deviation, ensuring they are accurately stored in AnomalyResult. Added a featureImputed boolean tag to flag imputed values. (See class: AnomalyResult).

4. Broadcast Support for HC Detectors:

Added a broadcast mechanism for HC detectors to identify entity models that haven’t received data in a given interval. This ensures models in memory process all relevant data before imputation begins. Single stream detectors handle this within existing transport messages. (See classes: ADHCImputeTransportAction, ADResultProcessor, ResultProcessor).

5. Introduction of ActionListenerExecutor:

Added ActionListenerExecutor to wrap response and failure handlers in an ActionListener, executing them asynchronously using the provided ExecutorService. This allows us to handle responses in the AD thread pool.

Testing:
Comprehensive testing was conducted, including both integration and unit tests. Of the 7135 lines added and 1683 lines removed, 4926 additions and 749 deletions are in tests, ensuring robust coverage.

Signed-off-by: Kaituo Li <[email protected]>

* rebase from main

Signed-off-by: Kaituo Li <[email protected]>

* add comment and remove redundant code

Signed-off-by: Kaituo Li <[email protected]>

---------

Signed-off-by: Kaituo Li <[email protected]>
kaituo added a commit to kaituo/anomaly-detection-1 that referenced this pull request Aug 19, 2024
…h-project#1274)

* Add Support for Handling Missing Data in Anomaly Detection

This PR introduces enhanced handling of missing data, giving customers the flexibility to choose how to address gaps in their data. Options include ignoring missing data (default behavior), filling with fixed values (customer-specified), zeros, or previous values. These options can improve recall in anomaly detection scenarios. For example, in this forum discussion https://forum.opensearch.org/t/do-missing-buckets-ruin-anomaly-detection/16535, customers can now opt to fill missing values with zeros to maintain detection accuracy.

Key Changes:
1. Enhanced Missing Data Handling:

Changed to ThresholdedRandomCutForest.process(double[] inputPoint, long timestamp, int[] missingValues) to support missing data in both real-time and historical analyses. The preview mode remains unchanged for efficiency, utilizing existing linear imputation techniques. (See classes: ADColdStart, ModelColdStart, ModelManager, ADBatchTaskRunner).

2. Refactoring Imputation & Processing:

Refactored the imputation process, failure handling, statistics collection, and result saving in Inferencer.

3. Improved Imputed Value Reconstruction:

Reconstructed imputed values using existing mean and standard deviation, ensuring they are accurately stored in AnomalyResult. Added a featureImputed boolean tag to flag imputed values. (See class: AnomalyResult).

4. Broadcast Support for HC Detectors:

Added a broadcast mechanism for HC detectors to identify entity models that haven’t received data in a given interval. This ensures models in memory process all relevant data before imputation begins. Single stream detectors handle this within existing transport messages. (See classes: ADHCImputeTransportAction, ADResultProcessor, ResultProcessor).

5. Introduction of ActionListenerExecutor:

Added ActionListenerExecutor to wrap response and failure handlers in an ActionListener, executing them asynchronously using the provided ExecutorService. This allows us to handle responses in the AD thread pool.

Testing:
Comprehensive testing was conducted, including both integration and unit tests. Of the 7135 lines added and 1683 lines removed, 4926 additions and 749 deletions are in tests, ensuring robust coverage.

Signed-off-by: Kaituo Li <[email protected]>

* rebase from main

Signed-off-by: Kaituo Li <[email protected]>

* add comment and remove redundant code

Signed-off-by: Kaituo Li <[email protected]>

---------

Signed-off-by: Kaituo Li <[email protected]>
kaituo added a commit that referenced this pull request Aug 19, 2024
…1281)

* Add Support for Handling Missing Data in Anomaly Detection

This PR introduces enhanced handling of missing data, giving customers the flexibility to choose how to address gaps in their data. Options include ignoring missing data (default behavior), filling with fixed values (customer-specified), zeros, or previous values. These options can improve recall in anomaly detection scenarios. For example, in this forum discussion https://forum.opensearch.org/t/do-missing-buckets-ruin-anomaly-detection/16535, customers can now opt to fill missing values with zeros to maintain detection accuracy.

Key Changes:
1. Enhanced Missing Data Handling:

Changed to ThresholdedRandomCutForest.process(double[] inputPoint, long timestamp, int[] missingValues) to support missing data in both real-time and historical analyses. The preview mode remains unchanged for efficiency, utilizing existing linear imputation techniques. (See classes: ADColdStart, ModelColdStart, ModelManager, ADBatchTaskRunner).

2. Refactoring Imputation & Processing:

Refactored the imputation process, failure handling, statistics collection, and result saving in Inferencer.

3. Improved Imputed Value Reconstruction:

Reconstructed imputed values using existing mean and standard deviation, ensuring they are accurately stored in AnomalyResult. Added a featureImputed boolean tag to flag imputed values. (See class: AnomalyResult).

4. Broadcast Support for HC Detectors:

Added a broadcast mechanism for HC detectors to identify entity models that haven’t received data in a given interval. This ensures models in memory process all relevant data before imputation begins. Single stream detectors handle this within existing transport messages. (See classes: ADHCImputeTransportAction, ADResultProcessor, ResultProcessor).

5. Introduction of ActionListenerExecutor:

Added ActionListenerExecutor to wrap response and failure handlers in an ActionListener, executing them asynchronously using the provided ExecutorService. This allows us to handle responses in the AD thread pool.

Testing:
Comprehensive testing was conducted, including both integration and unit tests. Of the 7135 lines added and 1683 lines removed, 4926 additions and 749 deletions are in tests, ensuring robust coverage.



* rebase from main



* add comment and remove redundant code



---------

Signed-off-by: Kaituo Li <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants